diff --git a/catkit2/bindings.cpp b/catkit2/bindings.cpp index af2aea18f..311151a9e 100644 --- a/catkit2/bindings.cpp +++ b/catkit2/bindings.cpp @@ -36,6 +36,7 @@ #include "Uuid.h" #include "ArrayView.h" #include "ProcessStats.h" +#include "TestHelpers.h" #include "testbed.pb.h" @@ -1293,6 +1294,67 @@ PYBIND11_MODULE(catkit_bindings, m) .def("acquire", &HybridPoolAllocator::Acquire) .def("release", &HybridPoolAllocator::Release); + // Stress test helpers for concurrency testing. + // Python creates threads and calls run_thread() on each context with GIL released. + py::module_ test_helpers = m.def_submodule("test_helpers", "Concurrency stress test helpers"); + + py::class_(test_helpers, "StressTestParams") + .def(py::init<>()) + .def_readwrite("num_threads", &TestHelpers::StressTestParams::num_threads) + .def_readwrite("iterations", &TestHelpers::StressTestParams::iterations) + .def_readwrite("max_item_shift", &TestHelpers::StressTestParams::max_item_shift) + .def_readwrite("max_retained_shift", &TestHelpers::StressTestParams::max_retained_shift) + .def_readwrite("alloc_probability", &TestHelpers::StressTestParams::alloc_probability) + .def_readwrite("retain_probability", &TestHelpers::StressTestParams::retain_probability) + .def_readwrite("free_probability", &TestHelpers::StressTestParams::free_probability) + .def_readwrite("transfer_probability", &TestHelpers::StressTestParams::transfer_probability) + .def_readwrite("transfer_buffer_size", &TestHelpers::StressTestParams::transfer_buffer_size) + .def_readwrite("data_buffer_size", &TestHelpers::StressTestParams::data_buffer_size); + + py::class_(test_helpers, "PoolAllocatorStressContext") + .def(py::init, const TestHelpers::StressTestParams&>()) + .def("run_thread", [](TestHelpers::PoolAllocatorStressContext& self, intptr_t tid) + { + py::gil_scoped_release release; + self.RunThread(tid); + }) + .def_property_readonly("total_allocations", &TestHelpers::PoolAllocatorStressContext::GetTotalAllocations) + .def_property_readonly("total_releases", &TestHelpers::PoolAllocatorStressContext::GetTotalReleases) + .def_property_readonly("failed_allocations", &TestHelpers::PoolAllocatorStressContext::GetFailedAllocations) + .def_property_readonly("failed_releases", &TestHelpers::PoolAllocatorStressContext::GetFailedReleases) + .def_property_readonly("double_free_detected", &TestHelpers::PoolAllocatorStressContext::GetDoubleFreeDetected) + .def_property_readonly("success", &TestHelpers::PoolAllocatorStressContext::GetSuccess); + + py::class_(test_helpers, "BuddyAllocatorStressContext") + .def(py::init, const TestHelpers::StressTestParams&>()) + .def("run_thread", [](TestHelpers::BuddyAllocatorStressContext& self, intptr_t tid) + { + py::gil_scoped_release release; + self.RunThread(tid); + }) + .def_property_readonly("total_allocations", &TestHelpers::BuddyAllocatorStressContext::GetTotalAllocations) + .def_property_readonly("total_releases", &TestHelpers::BuddyAllocatorStressContext::GetTotalReleases) + .def_property_readonly("failed_allocations", &TestHelpers::BuddyAllocatorStressContext::GetFailedAllocations) + .def_property_readonly("failed_releases", &TestHelpers::BuddyAllocatorStressContext::GetFailedReleases) + .def_property_readonly("double_free_detected", &TestHelpers::BuddyAllocatorStressContext::GetDoubleFreeDetected) + .def_property_readonly("success", &TestHelpers::BuddyAllocatorStressContext::GetSuccess); + + py::class_(test_helpers, "HybridPoolAllocatorStressContext") + .def(py::init, const TestHelpers::StressTestParams&>()) + .def("run_thread", [](TestHelpers::HybridPoolAllocatorStressContext& self, intptr_t tid) + { + py::gil_scoped_release release; + self.RunThread(tid); + }) + .def_property_readonly("total_allocations", &TestHelpers::HybridPoolAllocatorStressContext::GetTotalAllocations) + .def_property_readonly("total_releases", &TestHelpers::HybridPoolAllocatorStressContext::GetTotalReleases) + .def_property_readonly("failed_allocations", &TestHelpers::HybridPoolAllocatorStressContext::GetFailedAllocations) + .def_property_readonly("failed_releases", &TestHelpers::HybridPoolAllocatorStressContext::GetFailedReleases) + .def_property_readonly("double_free_detected", &TestHelpers::HybridPoolAllocatorStressContext::GetDoubleFreeDetected) + .def_property_readonly("success", &TestHelpers::HybridPoolAllocatorStressContext::GetSuccess); + + test_helpers.def("default_stress_params", &TestHelpers::DefaultStressParams); + py::class_(m, "ProcessStats") .def(py::init<>()) .def("update", &ProcessStats::Update) diff --git a/catkit_core/TestHelpers.cpp b/catkit_core/TestHelpers.cpp new file mode 100644 index 000000000..3c6c25b9f --- /dev/null +++ b/catkit_core/TestHelpers.cpp @@ -0,0 +1,167 @@ +#include "TestHelpers.h" + +#include + +namespace TestHelpers +{ + StressTestParams DefaultStressParams() + { + StressTestParams params; + params.num_threads = 8; + params.iterations = 1000; + params.max_item_shift = 5; + params.max_retained_shift = 7; + params.alloc_probability = 0.5; + params.retain_probability = 0.25; + params.free_probability = 0.66; + params.transfer_probability = 0.25; + params.transfer_buffer_size = 128; + params.data_buffer_size = 1000; + return params; + } + + ThreadRNG::ThreadRNG(uint64_t seed) + : m_Rng(seed), + m_Uniform01(0.0, 1.0) + {} + + bool ThreadRNG::Chance(double p) + { + return m_Uniform01(m_Rng) < p; + } + + size_t ThreadRNG::Pick(size_t max) + { + std::uniform_int_distribution dist(0, max - 1); + return dist(m_Rng); + } + + template + AllocatorStressContext::AllocatorStressContext( + std::shared_ptr allocator, + const StressTestParams& params) + : m_Allocator(std::move(allocator)), + m_Params(params), + m_TransferBuffer(params.transfer_buffer_size) + { + for (size_t i = 0; i < m_TransferBuffer.size(); ++i) + { + m_TransferBuffer[i].store(AllocatorTraits::INVALID_HANDLE, std::memory_order_relaxed); + } + } + + template + void AllocatorStressContext::RunThread(intptr_t tid) + { + using Traits = AllocatorTraits; + + ThreadRNG rng(static_cast(tid + 1) * 43); + + size_t allocs = m_Params.iterations; + size_t retain = allocs / 2; + + std::vector data; + data.reserve(m_Params.data_buffer_size); + size_t data_top = 0; + + std::vector retained; + retained.reserve(retain); + size_t retain_top = 0; + + while (allocs > 0 || retain > 0) + { + if (retain == 0 || (rng.Chance(m_Params.alloc_probability) && allocs > 0)) + { + allocs--; + + size_t size = 1ULL << rng.Pick(m_Params.max_item_shift); + Handle handle = Traits::Allocate(m_Allocator, size); + + if (handle != Traits::INVALID_HANDLE) + { + if (data_top >= data.size()) + { + data.resize(data.size() + 1000); + } + data[data_top++] = handle; + m_TotalAllocations.fetch_add(1, std::memory_order_relaxed); + } + else + { + m_FailedAllocations.fetch_add(1, std::memory_order_relaxed); + } + } + else + { + size_t size = 1ULL << rng.Pick(m_Params.max_retained_shift); + Handle handle = Traits::Allocate(m_Allocator, size); + + if (handle != Traits::INVALID_HANDLE) + { + if (retain_top >= retained.size()) + { + retained.resize(retained.size() + 100); + } + retained[retain_top++] = handle; + m_TotalAllocations.fetch_add(1, std::memory_order_relaxed); + } + else + { + m_FailedAllocations.fetch_add(1, std::memory_order_relaxed); + } + retain--; + } + + if (rng.Chance(m_Params.free_probability) && data_top > 0) + { + size_t idx = rng.Pick(data_top); + Handle handle = data[idx]; + + if (handle != Traits::INVALID_HANDLE) + { + Traits::Release(m_Allocator, handle); + m_TotalReleases.fetch_add(1, std::memory_order_relaxed); + data[idx] = Traits::INVALID_HANDLE; + } + } + + if (rng.Chance(m_Params.transfer_probability) && data_top > 0) + { + size_t data_idx = rng.Pick(data_top); + size_t transfer_idx = rng.Pick(m_Params.transfer_buffer_size); + + Handle our_handle = data[data_idx]; + Handle their_handle = m_TransferBuffer[transfer_idx].exchange(our_handle, std::memory_order_relaxed); + + if (their_handle != Traits::INVALID_HANDLE) + { + Traits::Release(m_Allocator, their_handle); + m_TotalReleases.fetch_add(1, std::memory_order_relaxed); + data[data_idx] = Traits::INVALID_HANDLE; + } + } + } + + for (size_t i = 0; i < retain_top; i++) + { + if (retained[i] != Traits::INVALID_HANDLE) + { + Traits::Release(m_Allocator, retained[i]); + m_TotalReleases.fetch_add(1, std::memory_order_relaxed); + } + } + + for (size_t i = 0; i < data_top; i++) + { + if (data[i] != Traits::INVALID_HANDLE) + { + Traits::Release(m_Allocator, data[i]); + m_TotalReleases.fetch_add(1, std::memory_order_relaxed); + } + } + } + + template class AllocatorStressContext; + template class AllocatorStressContext; + template class AllocatorStressContext; +} diff --git a/catkit_core/TestHelpers.h b/catkit_core/TestHelpers.h new file mode 100644 index 000000000..eca1b4281 --- /dev/null +++ b/catkit_core/TestHelpers.h @@ -0,0 +1,140 @@ +#ifndef TEST_HELPERS_H +#define TEST_HELPERS_H + +#include "PoolAllocator.h" +#include "BuddyAllocator.h" +#include "HybridPoolAllocator.h" + +#include +#include +#include +#include +#include +#include + +namespace TestHelpers +{ + struct StressTestParams + { + size_t num_threads; + size_t iterations; + size_t max_item_shift; + size_t max_retained_shift; + double alloc_probability; + double retain_probability; + double free_probability; + double transfer_probability; + size_t transfer_buffer_size; + size_t data_buffer_size; + }; + + StressTestParams DefaultStressParams(); + + class StressTestContextBase + { + public: + virtual ~StressTestContextBase() = default; + virtual void RunThread(intptr_t tid) = 0; + + size_t GetTotalAllocations() const { return m_TotalAllocations.load(std::memory_order_relaxed); } + size_t GetTotalReleases() const { return m_TotalReleases.load(std::memory_order_relaxed); } + size_t GetFailedAllocations() const { return m_FailedAllocations.load(std::memory_order_relaxed); } + size_t GetFailedReleases() const { return m_FailedReleases.load(std::memory_order_relaxed); } + size_t GetDoubleFreeDetected() const { return m_DoubleFreeDetected.load(std::memory_order_relaxed); } + bool GetSuccess() const { return m_DoubleFreeDetected.load(std::memory_order_relaxed) == 0; } + + protected: + std::atomic m_TotalAllocations{0}; + std::atomic m_TotalReleases{0}; + std::atomic m_FailedAllocations{0}; + std::atomic m_FailedReleases{0}; + std::atomic m_DoubleFreeDetected{0}; + }; + + template + struct AllocatorTraits; + + template<> + struct AllocatorTraits + { + using Handle = uint32_t; + static constexpr Handle INVALID_HANDLE = UINT32_MAX; + + static Handle Allocate(std::shared_ptr alloc, size_t) + { + return alloc->Allocate(); + } + + static bool Release(std::shared_ptr alloc, Handle handle) + { + return alloc->Release(handle); + } + }; + + template<> + struct AllocatorTraits + { + using Handle = size_t; + static constexpr Handle INVALID_HANDLE = 0; + + static Handle Allocate(std::shared_ptr alloc, size_t size) + { + return alloc->Allocate(size); + } + + static bool Release(std::shared_ptr alloc, Handle handle) + { + return alloc->Release(handle); + } + }; + + template<> + struct AllocatorTraits + { + using Handle = size_t; + static constexpr Handle INVALID_HANDLE = 0; + + static Handle Allocate(std::shared_ptr alloc, size_t size) + { + return alloc->Allocate(size); + } + + static bool Release(std::shared_ptr alloc, Handle handle) + { + return alloc->Release(handle); + } + }; + + class ThreadRNG + { + public: + explicit ThreadRNG(uint64_t seed); + bool Chance(double p); + size_t Pick(size_t max); + + private: + std::mt19937_64 m_Rng; + std::uniform_real_distribution m_Uniform01; + }; + + template + class AllocatorStressContext : public StressTestContextBase + { + public: + using Handle = typename AllocatorTraits::Handle; + + AllocatorStressContext(std::shared_ptr allocator, const StressTestParams& params); + void RunThread(intptr_t tid) override; + + private: + std::shared_ptr m_Allocator; + StressTestParams m_Params; + std::vector> m_TransferBuffer; + }; + + using PoolAllocatorStressContext = AllocatorStressContext; + using BuddyAllocatorStressContext = AllocatorStressContext; + using HybridPoolAllocatorStressContext = AllocatorStressContext; +} + +#endif // TEST_HELPERS_H diff --git a/tests/test_allocator.py b/tests/test_allocator.py index 8a2b03c8a..688a60043 100644 --- a/tests/test_allocator.py +++ b/tests/test_allocator.py @@ -1,11 +1,44 @@ -from catkit2.catkit_bindings import LocalMemory, HybridPoolAllocator, BuddyAllocator, PoolAllocator +from catkit2.catkit_bindings import LocalMemory, HybridPoolAllocator, BuddyAllocator, PoolAllocator, test_helpers import pytest +import threading -DYNAMIC_CAPACITY = 1024 * 1024 +DYNAMIC_CAPACITY = 512 * 1024 * 1024 MIN_SIZE = 16 MIN_SIZE_POOL = 1024 POOL_CAPACITY = 16 +STRESS_NUM_THREADS = 8 +STRESS_ITERATIONS = 20000 + +def _run_stress(context_class, allocator): + params = test_helpers.default_stress_params() + params.num_threads = STRESS_NUM_THREADS + params.iterations = STRESS_ITERATIONS + + context = context_class(allocator, params) + + exceptions = [] + + def run_with_exception_capture(tid): + try: + context.run_thread(tid) + except Exception as e: + exceptions.append(e) + + threads = [] + for i in range(params.num_threads): + t = threading.Thread(target=run_with_exception_capture, args=(i,)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + if exceptions: + raise exceptions[0] + + return context + @pytest.mark.parametrize("allocator_constructor", [ lambda header: BuddyAllocator.create(header, DYNAMIC_CAPACITY, MIN_SIZE), lambda header: HybridPoolAllocator.create(header, DYNAMIC_CAPACITY, MIN_SIZE, MIN_SIZE_POOL), @@ -43,3 +76,36 @@ def test_fixed_size_allocator(allocator_constructor): # Allocating more blocks than the pool capacity should raise an error. with pytest.raises(RuntimeError): allocator.allocate() + +def test_pool_allocator_stress(): + memory = LocalMemory.create(1024 * 1024 * 512) + allocator = PoolAllocator.create(memory, 100000) + + context = _run_stress(test_helpers.PoolAllocatorStressContext, allocator) + + assert context.success, f"PoolAllocator stress test failed: double_frees={context.double_free_detected}" + assert context.total_allocations > 0, "PoolAllocator should have allocated some blocks" + assert context.total_releases > 0, "PoolAllocator should have released some blocks" + assert context.total_releases + context.failed_allocations >= context.total_allocations, "PoolAllocator allocations should either succeed or fail" + +def test_buddy_allocator_stress(): + memory = LocalMemory.create(1024 * 1024 * 512) + allocator = BuddyAllocator.create(memory, DYNAMIC_CAPACITY, MIN_SIZE) + + context = _run_stress(test_helpers.BuddyAllocatorStressContext, allocator) + + assert context.success, f"BuddyAllocator stress test failed: double_frees={context.double_free_detected}" + assert context.total_allocations > 0, "BuddyAllocator should have allocated some blocks" + assert context.total_releases > 0, "BuddyAllocator should have released some blocks" + assert context.total_releases + context.failed_allocations >= context.total_allocations, "BuddyAllocator allocations should either succeed or fail" + +def test_hybrid_pool_allocator_stress(): + memory = LocalMemory.create(1024 * 1024 * 512) + allocator = HybridPoolAllocator.create(memory, DYNAMIC_CAPACITY, MIN_SIZE, MIN_SIZE_POOL) + + context = _run_stress(test_helpers.HybridPoolAllocatorStressContext, allocator) + + assert context.success, f"HybridPoolAllocator stress test failed: double_frees={context.double_free_detected}" + assert context.total_allocations > 0, "HybridPoolAllocator should have allocated some blocks" + assert context.total_releases > 0, "HybridPoolAllocator should have released some blocks" + assert context.total_releases + context.failed_allocations >= context.total_allocations, "HybridPoolAllocator allocations should either succeed or fail"