Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions catkit2/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "Uuid.h"
#include "ArrayView.h"
#include "ProcessStats.h"
#include "TestHelpers.h"

#include "testbed.pb.h"

Expand Down Expand Up @@ -1293,6 +1294,67 @@ PYBIND11_MODULE(catkit_bindings, m)
.def("acquire", &HybridPoolAllocator::Acquire)
.def("release", &HybridPoolAllocator::Release);

// Stress test helpers for concurrency testing.
// Python creates threads and calls run_thread() on each context with GIL released.
py::module_ test_helpers = m.def_submodule("test_helpers", "Concurrency stress test helpers");

py::class_<TestHelpers::StressTestParams>(test_helpers, "StressTestParams")
.def(py::init<>())
.def_readwrite("num_threads", &TestHelpers::StressTestParams::num_threads)
.def_readwrite("iterations", &TestHelpers::StressTestParams::iterations)
.def_readwrite("max_item_shift", &TestHelpers::StressTestParams::max_item_shift)
.def_readwrite("max_retained_shift", &TestHelpers::StressTestParams::max_retained_shift)
.def_readwrite("alloc_probability", &TestHelpers::StressTestParams::alloc_probability)
.def_readwrite("retain_probability", &TestHelpers::StressTestParams::retain_probability)
.def_readwrite("free_probability", &TestHelpers::StressTestParams::free_probability)
.def_readwrite("transfer_probability", &TestHelpers::StressTestParams::transfer_probability)
.def_readwrite("transfer_buffer_size", &TestHelpers::StressTestParams::transfer_buffer_size)
.def_readwrite("data_buffer_size", &TestHelpers::StressTestParams::data_buffer_size);

py::class_<TestHelpers::PoolAllocatorStressContext>(test_helpers, "PoolAllocatorStressContext")
.def(py::init<std::shared_ptr<PoolAllocator>, const TestHelpers::StressTestParams&>())
.def("run_thread", [](TestHelpers::PoolAllocatorStressContext& self, intptr_t tid)
{
py::gil_scoped_release release;
self.RunThread(tid);
})
.def_property_readonly("total_allocations", &TestHelpers::PoolAllocatorStressContext::GetTotalAllocations)
.def_property_readonly("total_releases", &TestHelpers::PoolAllocatorStressContext::GetTotalReleases)
.def_property_readonly("failed_allocations", &TestHelpers::PoolAllocatorStressContext::GetFailedAllocations)
.def_property_readonly("failed_releases", &TestHelpers::PoolAllocatorStressContext::GetFailedReleases)
.def_property_readonly("double_free_detected", &TestHelpers::PoolAllocatorStressContext::GetDoubleFreeDetected)
.def_property_readonly("success", &TestHelpers::PoolAllocatorStressContext::GetSuccess);

py::class_<TestHelpers::BuddyAllocatorStressContext>(test_helpers, "BuddyAllocatorStressContext")
.def(py::init<std::shared_ptr<BuddyAllocator>, const TestHelpers::StressTestParams&>())
.def("run_thread", [](TestHelpers::BuddyAllocatorStressContext& self, intptr_t tid)
{
py::gil_scoped_release release;
self.RunThread(tid);
})
.def_property_readonly("total_allocations", &TestHelpers::BuddyAllocatorStressContext::GetTotalAllocations)
.def_property_readonly("total_releases", &TestHelpers::BuddyAllocatorStressContext::GetTotalReleases)
.def_property_readonly("failed_allocations", &TestHelpers::BuddyAllocatorStressContext::GetFailedAllocations)
.def_property_readonly("failed_releases", &TestHelpers::BuddyAllocatorStressContext::GetFailedReleases)
.def_property_readonly("double_free_detected", &TestHelpers::BuddyAllocatorStressContext::GetDoubleFreeDetected)
.def_property_readonly("success", &TestHelpers::BuddyAllocatorStressContext::GetSuccess);

py::class_<TestHelpers::HybridPoolAllocatorStressContext>(test_helpers, "HybridPoolAllocatorStressContext")
.def(py::init<std::shared_ptr<HybridPoolAllocator>, const TestHelpers::StressTestParams&>())
.def("run_thread", [](TestHelpers::HybridPoolAllocatorStressContext& self, intptr_t tid)
{
py::gil_scoped_release release;
self.RunThread(tid);
})
.def_property_readonly("total_allocations", &TestHelpers::HybridPoolAllocatorStressContext::GetTotalAllocations)
.def_property_readonly("total_releases", &TestHelpers::HybridPoolAllocatorStressContext::GetTotalReleases)
.def_property_readonly("failed_allocations", &TestHelpers::HybridPoolAllocatorStressContext::GetFailedAllocations)
.def_property_readonly("failed_releases", &TestHelpers::HybridPoolAllocatorStressContext::GetFailedReleases)
.def_property_readonly("double_free_detected", &TestHelpers::HybridPoolAllocatorStressContext::GetDoubleFreeDetected)
.def_property_readonly("success", &TestHelpers::HybridPoolAllocatorStressContext::GetSuccess);

test_helpers.def("default_stress_params", &TestHelpers::DefaultStressParams);

py::class_<ProcessStats>(m, "ProcessStats")
.def(py::init<>())
.def("update", &ProcessStats::Update)
Expand Down
167 changes: 167 additions & 0 deletions catkit_core/TestHelpers.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
#include "TestHelpers.h"

#include <vector>

namespace TestHelpers
{
StressTestParams DefaultStressParams()
{
StressTestParams params;
params.num_threads = 8;
params.iterations = 1000;
params.max_item_shift = 5;
params.max_retained_shift = 7;
params.alloc_probability = 0.5;
params.retain_probability = 0.25;
params.free_probability = 0.66;
params.transfer_probability = 0.25;
params.transfer_buffer_size = 128;
params.data_buffer_size = 1000;
return params;
}

ThreadRNG::ThreadRNG(uint64_t seed)
: m_Rng(seed),
m_Uniform01(0.0, 1.0)
{}

bool ThreadRNG::Chance(double p)
{
return m_Uniform01(m_Rng) < p;
}

size_t ThreadRNG::Pick(size_t max)
{
std::uniform_int_distribution<size_t> dist(0, max - 1);
return dist(m_Rng);
}

template<typename Allocator>
AllocatorStressContext<Allocator>::AllocatorStressContext(
std::shared_ptr<Allocator> allocator,
const StressTestParams& params)
: m_Allocator(std::move(allocator)),
m_Params(params),
m_TransferBuffer(params.transfer_buffer_size)
{
for (size_t i = 0; i < m_TransferBuffer.size(); ++i)
{
m_TransferBuffer[i].store(AllocatorTraits<Allocator>::INVALID_HANDLE, std::memory_order_relaxed);
}
}

template<typename Allocator>
void AllocatorStressContext<Allocator>::RunThread(intptr_t tid)
{
using Traits = AllocatorTraits<Allocator>;

ThreadRNG rng(static_cast<uint64_t>(tid + 1) * 43);

size_t allocs = m_Params.iterations;
size_t retain = allocs / 2;

std::vector<Handle> data;
data.reserve(m_Params.data_buffer_size);
size_t data_top = 0;

std::vector<Handle> retained;
retained.reserve(retain);
size_t retain_top = 0;

while (allocs > 0 || retain > 0)
{
if (retain == 0 || (rng.Chance(m_Params.alloc_probability) && allocs > 0))
{
allocs--;

size_t size = 1ULL << rng.Pick(m_Params.max_item_shift);
Handle handle = Traits::Allocate(m_Allocator, size);

if (handle != Traits::INVALID_HANDLE)
{
if (data_top >= data.size())
{
data.resize(data.size() + 1000);
}
data[data_top++] = handle;
m_TotalAllocations.fetch_add(1, std::memory_order_relaxed);
}
else
{
m_FailedAllocations.fetch_add(1, std::memory_order_relaxed);
}
}
else
{
size_t size = 1ULL << rng.Pick(m_Params.max_retained_shift);
Handle handle = Traits::Allocate(m_Allocator, size);

if (handle != Traits::INVALID_HANDLE)
{
if (retain_top >= retained.size())
{
retained.resize(retained.size() + 100);
}
retained[retain_top++] = handle;
m_TotalAllocations.fetch_add(1, std::memory_order_relaxed);
}
else
{
m_FailedAllocations.fetch_add(1, std::memory_order_relaxed);
}
retain--;
}

if (rng.Chance(m_Params.free_probability) && data_top > 0)
{
size_t idx = rng.Pick(data_top);
Handle handle = data[idx];

if (handle != Traits::INVALID_HANDLE)
{
Traits::Release(m_Allocator, handle);
m_TotalReleases.fetch_add(1, std::memory_order_relaxed);
data[idx] = Traits::INVALID_HANDLE;
}
}

if (rng.Chance(m_Params.transfer_probability) && data_top > 0)
{
size_t data_idx = rng.Pick(data_top);
size_t transfer_idx = rng.Pick(m_Params.transfer_buffer_size);

Handle our_handle = data[data_idx];
Handle their_handle = m_TransferBuffer[transfer_idx].exchange(our_handle, std::memory_order_relaxed);

if (their_handle != Traits::INVALID_HANDLE)
{
Traits::Release(m_Allocator, their_handle);
m_TotalReleases.fetch_add(1, std::memory_order_relaxed);
data[data_idx] = Traits::INVALID_HANDLE;
}
}
}

for (size_t i = 0; i < retain_top; i++)
{
if (retained[i] != Traits::INVALID_HANDLE)
{
Traits::Release(m_Allocator, retained[i]);
m_TotalReleases.fetch_add(1, std::memory_order_relaxed);
}
}

for (size_t i = 0; i < data_top; i++)
{
if (data[i] != Traits::INVALID_HANDLE)
{
Traits::Release(m_Allocator, data[i]);
m_TotalReleases.fetch_add(1, std::memory_order_relaxed);
}
}
}

template class AllocatorStressContext<PoolAllocator>;
template class AllocatorStressContext<BuddyAllocator>;
template class AllocatorStressContext<HybridPoolAllocator>;
}
140 changes: 140 additions & 0 deletions catkit_core/TestHelpers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#ifndef TEST_HELPERS_H
#define TEST_HELPERS_H

#include "PoolAllocator.h"
#include "BuddyAllocator.h"
#include "HybridPoolAllocator.h"

#include <cstdint>
#include <cstddef>
#include <memory>
#include <vector>
#include <atomic>
#include <random>

namespace TestHelpers
{
struct StressTestParams
{
size_t num_threads;
size_t iterations;
size_t max_item_shift;
size_t max_retained_shift;
double alloc_probability;
double retain_probability;
double free_probability;
double transfer_probability;
size_t transfer_buffer_size;
size_t data_buffer_size;
};

StressTestParams DefaultStressParams();

class StressTestContextBase
{
public:
virtual ~StressTestContextBase() = default;
virtual void RunThread(intptr_t tid) = 0;

size_t GetTotalAllocations() const { return m_TotalAllocations.load(std::memory_order_relaxed); }
size_t GetTotalReleases() const { return m_TotalReleases.load(std::memory_order_relaxed); }
size_t GetFailedAllocations() const { return m_FailedAllocations.load(std::memory_order_relaxed); }
size_t GetFailedReleases() const { return m_FailedReleases.load(std::memory_order_relaxed); }
size_t GetDoubleFreeDetected() const { return m_DoubleFreeDetected.load(std::memory_order_relaxed); }
bool GetSuccess() const { return m_DoubleFreeDetected.load(std::memory_order_relaxed) == 0; }

protected:
std::atomic<size_t> m_TotalAllocations{0};
std::atomic<size_t> m_TotalReleases{0};
std::atomic<size_t> m_FailedAllocations{0};
std::atomic<size_t> m_FailedReleases{0};
std::atomic<size_t> m_DoubleFreeDetected{0};
};

template<typename Allocator>
struct AllocatorTraits;

template<>
struct AllocatorTraits<PoolAllocator>
{
using Handle = uint32_t;
static constexpr Handle INVALID_HANDLE = UINT32_MAX;

static Handle Allocate(std::shared_ptr<PoolAllocator> alloc, size_t)
{
return alloc->Allocate();
}

static bool Release(std::shared_ptr<PoolAllocator> alloc, Handle handle)
{
return alloc->Release(handle);
}
};

template<>
struct AllocatorTraits<BuddyAllocator>
{
using Handle = size_t;
static constexpr Handle INVALID_HANDLE = 0;

static Handle Allocate(std::shared_ptr<BuddyAllocator> alloc, size_t size)
{
return alloc->Allocate(size);
}

static bool Release(std::shared_ptr<BuddyAllocator> alloc, Handle handle)
{
return alloc->Release(handle);
}
};

template<>
struct AllocatorTraits<HybridPoolAllocator>
{
using Handle = size_t;
static constexpr Handle INVALID_HANDLE = 0;

static Handle Allocate(std::shared_ptr<HybridPoolAllocator> alloc, size_t size)
{
return alloc->Allocate(size);
}

static bool Release(std::shared_ptr<HybridPoolAllocator> alloc, Handle handle)
{
return alloc->Release(handle);
}
};

class ThreadRNG
{
public:
explicit ThreadRNG(uint64_t seed);
bool Chance(double p);
size_t Pick(size_t max);

private:
std::mt19937_64 m_Rng;
std::uniform_real_distribution<double> m_Uniform01;
};

template<typename Allocator>
class AllocatorStressContext : public StressTestContextBase
{
public:
using Handle = typename AllocatorTraits<Allocator>::Handle;

AllocatorStressContext(std::shared_ptr<Allocator> allocator, const StressTestParams& params);
void RunThread(intptr_t tid) override;

private:
std::shared_ptr<Allocator> m_Allocator;
StressTestParams m_Params;
std::vector<std::atomic<Handle>> m_TransferBuffer;
};

using PoolAllocatorStressContext = AllocatorStressContext<PoolAllocator>;
using BuddyAllocatorStressContext = AllocatorStressContext<BuddyAllocator>;
using HybridPoolAllocatorStressContext = AllocatorStressContext<HybridPoolAllocator>;
}

#endif // TEST_HELPERS_H
Loading
Loading