diff --git a/CMakeLists.txt b/CMakeLists.txt index 86346c68e6..bbc023a22b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -902,6 +902,7 @@ if(NOT WITH_API_ONLY) if(WITH_FUNC_TESTS) add_subdirectory(functional) endif() + add_subdirectory(stress) endif() include(cmake/opentelemetry-build-external-component.cmake) diff --git a/stress/CMakeLists.txt b/stress/CMakeLists.txt new file mode 100644 index 0000000000..a1184b2513 --- /dev/null +++ b/stress/CMakeLists.txt @@ -0,0 +1,6 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +# Add subdirectories for common and metrics components +add_subdirectory(common) +add_subdirectory(metrics) diff --git a/stress/common/CMakeLists.txt b/stress/common/CMakeLists.txt new file mode 100644 index 0000000000..de8b0200c8 --- /dev/null +++ b/stress/common/CMakeLists.txt @@ -0,0 +1,14 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +add_library(stress STATIC stress.cc) + +# Include directory for the throughput library +target_include_directories(stress PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) + +# Set C++ standard +set_target_properties( + stress + PROPERTIES CXX_STANDARD 17 + CXX_STANDARD_REQUIRED YES + CXX_EXTENSIONS NO) diff --git a/stress/common/stress.cc b/stress/common/stress.cc new file mode 100644 index 0000000000..4d53dd37aa --- /dev/null +++ b/stress/common/stress.cc @@ -0,0 +1,142 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "stress.h" + +// Global flags +std::atomic STOP( + false); // Global flag to stop the stress test when signaled (e.g., via Ctrl+C) +std::atomic READY(false); // Global flag to synchronize thread start + +// StressTest constructor +Stress::Stress(std::function func, size_t numThreads) + : func_(std::move(func)), stats_(numThreads), numThreads_(numThreads) +{} + +// Main function to start the stress test +void Stress::run() +{ + std::cout << "Starting stress test with " << numThreads_ << " threads...\n"; + auto startTime = std::chrono::steady_clock::now(); + + READY.store(false, std::memory_order_release); + + std::thread controllerThread(&Stress::monitorThroughput, this); + + threads_.reserve(numThreads_); + for (size_t i = 0; i < numThreads_; ++i) + { + threads_.emplace_back(&Stress::workerThread, this, i); + } + + READY.store(true, std::memory_order_release); + + for (auto &thread : threads_) + { + if (thread.joinable()) + { + thread.join(); + } + } + + if (controllerThread.joinable()) + { + controllerThread.join(); + } + + auto endTime = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(endTime - startTime); + + uint64_t totalCount = 0; + for (const auto &stat : stats_) + { + totalCount += stat.count.load(std::memory_order_relaxed); + } + + std::cout << "\nTest completed:\n" + << "Total iterations: " << formatNumber(totalCount) << "\n" + << "Duration: " << duration.count() << " seconds\n" + << "Average throughput: " << formatNumber(totalCount / duration.count()) + << " iterations/sec\n"; +} + +// Worker thread function +void Stress::workerThread(size_t threadIndex) +{ +#ifdef __linux__ + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(threadIndex % std::thread::hardware_concurrency(), &cpuset); + pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset); +#endif + + while (!STOP.load(std::memory_order_acquire)) + { + func_(); + stats_[threadIndex].count.fetch_add(1, std::memory_order_relaxed); + } +} + +// Monitoring thread function +void Stress::monitorThroughput() +{ + uint64_t lastTotalCount = 0; + auto lastTime = std::chrono::steady_clock::now(); + std::vector throughputHistory; + + while (!STOP.load(std::memory_order_acquire)) + { + std::this_thread::sleep_for(std::chrono::seconds(SLIDING_WINDOW_SIZE)); + + auto currentTime = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(currentTime - lastTime).count(); + + uint64_t totalCount = 0; + for (const auto &stat : stats_) + { + totalCount += stat.count.load(std::memory_order_relaxed); + } + + uint64_t currentCount = totalCount - lastTotalCount; + lastTotalCount = totalCount; + lastTime = currentTime; + + if (elapsed > 0) + { + uint64_t throughput = currentCount / elapsed; + throughputHistory.push_back(throughput); + + double avg = 0; + uint64_t min = throughput; + uint64_t max = throughput; + + for (uint64_t t : throughputHistory) + { + avg += t; + min = std::min(min, t); + max = std::max(max, t); + } + avg /= throughputHistory.size(); + + std::cout << "\rThroughput: " << formatNumber(throughput) + << " it/s | Avg: " << formatNumber(static_cast(avg)) + << " | Min: " << formatNumber(min) << " | Max: " << formatNumber(max) << std::flush; + } + } + std::cout << std::endl; +} + +// Helper function to format numbers with commas +std::string Stress::formatNumber(uint64_t num) +{ + std::ostringstream oss; + oss.imbue(std::locale("")); + oss << std::fixed << num; + return oss.str(); +} + +// Signal handler to set the STOP flag when receiving a termination signal +void Stress::stop() +{ + STOP.store(true, std::memory_order_release); +} diff --git a/stress/common/stress.h b/stress/common/stress.h new file mode 100644 index 0000000000..3b2401104e --- /dev/null +++ b/stress/common/stress.h @@ -0,0 +1,99 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +/** + * A multi-threaded stress test framework to measure throughput and performance of a given workload. + * + * ## Overview + * Multi-threaded stress test framework designed to execute a specified function + * in parallel across multiple threads and measure its throughput. The results are displayed + * dynamically, including current throughput, average throughput, and minimum/maximum throughput + * during the test. + * + * ## Key Features + * - **Multi-threading**: Uses std::thread to execute the workload in parallel across a user-defined + * number of threads. + * - **Thread Safety**: Tracks iteration counts per thread using an aligned and padded structure + * (WorkerStats) to avoid false sharing and ensure efficient thread-safe updates. + * - **Dynamic Metrics**: Continuously calculates and displays throughput (iterations/sec) over + * sliding time windows. + * - **Graceful Termination**: Captures signals (e.g., Ctrl+C) to cleanly stop all threads and + * summarize the results. + * - **Thread Affinity (Linux-only)**: Optionally binds threads to specific CPU cores for consistent + * performance. + * + * ## Implementation Details + * - **Worker Threads**: + * - Each worker thread executes the workload function (func) in a loop until a global STOP flag + * is set. + * - Each thread maintains its own iteration count to minimize contention. + * + * - **Throughput Monitoring**: + * - A separate controller thread monitors throughput by periodically summing up iteration counts + * across threads. + * - Throughput is calculated over a sliding window (SLIDING_WINDOW_SIZE) and displayed + * dynamically. + * + * - **Thread Synchronization**: + * - The STOP flag, an std::atomic, ensures all threads stop gracefully when signaled. + * - Memory ordering (e.g., std::memory_order_relaxed, std::memory_order_acquire/release) is used + * to optimize performance while maintaining correctness. + * + * - **Final Summary**: + * - At the end of the test, the program calculates and prints the total iterations, duration, and + * average throughput. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// Configuration constants +constexpr uint64_t SLIDING_WINDOW_SIZE = 2; // Time window for throughput calculation (in seconds) +constexpr size_t CACHE_LINE_SIZE = 64; // Typical CPU cache line size for alignment + +// WorkerStats structure for tracking iteration counts per thread +struct alignas(CACHE_LINE_SIZE) WorkerStats +{ + std::atomic count{0}; // Count of iterations for a specific thread + char padding[CACHE_LINE_SIZE - + sizeof(std::atomic)]; // Padding to ensure proper alignment +}; + +// StressTest class +class Stress +{ +public: + // Constructor + Stress(std::function func, size_t numThreads = std::thread::hardware_concurrency()); + + // Main function to start the stress test + void run(); + + // function to stop the test + void stop(); + +private: + std::function func_; // Function to be executed by each thread + std::vector threads_; // Vector to hold worker threads + std::vector stats_; // Vector to hold statistics for each thread + const size_t numThreads_; // Number of threads to run + std::atomic stopFlag_{false}; // signal to stop the test + + // Worker thread function + void workerThread(size_t threadIndex); + + // Monitoring thread function to calculate and display throughput + void monitorThroughput(); + + // Helper function to format numbers with commas for readability + static std::string formatNumber(uint64_t num); +}; diff --git a/stress/metrics/CMakeLists.txt b/stress/metrics/CMakeLists.txt new file mode 100644 index 0000000000..eb8d457d88 --- /dev/null +++ b/stress/metrics/CMakeLists.txt @@ -0,0 +1,30 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +# Define the metrics executable +add_executable(stress_metrics metrics.cc) + +# Link throughput library and OpenTelemetry Metrics API +target_link_libraries( + stress_metrics PRIVATE stress opentelemetry_metrics # OpenTelemetry Metrics + # SDK +) + +# Include directories for throughput +target_include_directories(stress_metrics + PRIVATE ${CMAKE_SOURCE_DIR}/stress/common) + +# Set properties +set_target_properties( + stress_metrics + PROPERTIES CXX_STANDARD 17 + CXX_STANDARD_REQUIRED YES + CXX_EXTENSIONS NO) + +# Optional: Installation +if(OPENTELEMETRY_INSTALL) + install( + TARGETS stress_metrics + EXPORT "${PROJECT_NAME}-target" + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}) +endif() diff --git a/stress/metrics/metrics.cc b/stress/metrics/metrics.cc new file mode 100644 index 0000000000..ac1fe2c9e0 --- /dev/null +++ b/stress/metrics/metrics.cc @@ -0,0 +1,142 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "stress.h" + +#include +#include +#include +#include +#include +#include + +#include "opentelemetry/common/attribute_value.h" +#include "opentelemetry/metrics/meter_provider.h" +#include "opentelemetry/metrics/provider.h" +#include "opentelemetry/nostd/shared_ptr.h" +#include "opentelemetry/sdk/metrics/aggregation/aggregation_config.h" +#include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader_factory.h" +#include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader_options.h" +#include "opentelemetry/sdk/metrics/instruments.h" +#include "opentelemetry/sdk/metrics/meter_provider.h" +#include "opentelemetry/sdk/metrics/meter_provider_factory.h" +#include "opentelemetry/sdk/metrics/metric_reader.h" +#include "opentelemetry/sdk/metrics/push_metric_exporter.h" +#include "opentelemetry/sdk/metrics/state/filtered_ordered_attribute_map.h" +#include "opentelemetry/sdk/metrics/view/instrument_selector.h" +#include "opentelemetry/sdk/metrics/view/instrument_selector_factory.h" +#include "opentelemetry/sdk/metrics/view/meter_selector.h" +#include "opentelemetry/sdk/metrics/view/meter_selector_factory.h" +#include "opentelemetry/sdk/metrics/view/view.h" +#include "opentelemetry/sdk/metrics/view/view_factory.h" + +Stress *globalStressTest = nullptr; + +void signalHandler(int) +{ + if (globalStressTest) + { + globalStressTest->stop(); + } +} + +namespace metrics_sdk = opentelemetry::sdk::metrics; +namespace metrics_api = opentelemetry::metrics; + +namespace +{ +class MockMetricExporter : public opentelemetry::sdk::metrics::PushMetricExporter +{ +public: + opentelemetry::sdk::common::ExportResult Export( + const opentelemetry::sdk::metrics::ResourceMetrics & /*data*/) noexcept override + { + // Ignore all metrics and return success + return opentelemetry::sdk::common::ExportResult::kSuccess; + } + + bool ForceFlush(std::chrono::microseconds /*timeout*/) noexcept override + { + return true; // No-op + } + + bool Shutdown(std::chrono::microseconds /*timeout*/) noexcept override + { + return true; // No-op + } + + opentelemetry::sdk::metrics::AggregationTemporality GetAggregationTemporality( + opentelemetry::sdk::metrics::InstrumentType) const noexcept override + { + return opentelemetry::sdk::metrics::AggregationTemporality::kDelta; + } +}; + +// Pre-generate random attributes +std::vector> GenerateAttributeSet(size_t count) +{ + std::vector> attributes_set; + for (size_t i = 0; i < count; ++i) + { + std::map attributes; + attributes["dim1"] = rand() % 100; // Random value between 0 and 99 + attributes["dim2"] = rand() % 100; // Random value between 0 and 99 + attributes["dim3"] = rand() % 100; // Random value between 0 and 99 + attributes_set.push_back(attributes); + } + return attributes_set; +} + +void InitMetrics(const std::string /*&name*/) +{ + metrics_sdk::PeriodicExportingMetricReaderOptions options; + options.export_interval_millis = std::chrono::milliseconds(1000); + options.export_timeout_millis = std::chrono::milliseconds(500); + auto exporter = std::make_unique(); + auto reader = + metrics_sdk::PeriodicExportingMetricReaderFactory::Create(std::move(exporter), options); + auto provider = metrics_sdk::MeterProviderFactory::Create(); + provider->AddMetricReader(std::move(reader)); + std::shared_ptr api_provider(std::move(provider)); + metrics_api::Provider::SetMeterProvider(api_provider); +} + +void CleanupMetrics() +{ + std::shared_ptr none; + metrics_api::Provider::SetMeterProvider(none); +} + +void CounterExample(opentelemetry::nostd::unique_ptr> &counter, + const std::vector> &attributes_set) +{ + // Pick a random attribute set + size_t random_index = rand() % attributes_set.size(); + const auto &attributes = attributes_set[random_index]; + + // Record the metric with the selected attributes + counter->Add( + 1.0, opentelemetry::common::KeyValueIterableView>(attributes), + opentelemetry::context::Context{}); +} +} // namespace + +int main(int /*argc*/, char ** /*argv[]*/) +{ + std::srand(static_cast(std::time(nullptr))); // Seed the random number generator + // Pre-generate a set of random attributes + size_t attribute_count = 1000; // Number of attribute sets to pre-generate + auto attributes_set = GenerateAttributeSet(attribute_count); + + InitMetrics("metrics_stress_test"); + auto provider = metrics_api::Provider::GetMeterProvider(); + auto meter = provider->GetMeter("metrics_stress_test", "1.0.0"); + auto counter = meter->CreateDoubleCounter("metrics_stress_test_counter"); + auto func = [&counter, &attributes_set]() { CounterExample(counter, attributes_set); }; + Stress stressTest(func, std::thread::hardware_concurrency()); + globalStressTest = &stressTest; + std::signal(SIGINT, signalHandler); + stressTest.run(); + CleanupMetrics(); + return 0; +}