Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions include/mpsc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ class MPSC {
MPSC() = default;

put_t put(const T &value) noexcept {
auto write_index = next_free_index_.load(std::memory_order_acquire);
alignas(64) thread_local static std::size_t reader_index_cache;
alignas(64) thread_local static std::size_t write_index;
do {
while (write_index > (consumer_.reader_index_.load(std::memory_order_relaxed) + common_.index_mask_)) {
while (write_index > (reader_index_cache + common_.index_mask_)) {
write_index = next_free_index_.load(std::memory_order_acquire);
reader_index_cache = consumer_.reader_index_.load(std::memory_order_relaxed);
if constexpr (std::is_same<PutWaitStrategy, ReturnImmediateStrategy>::value) {
return false;
} else {
common_.put_wait_.wait(
[this, write_index] { return write_index <= (consumer_.reader_index_.load(std::memory_order_relaxed) + common_.index_mask_); });
common_.put_wait_.wait([this] { return write_index <= (consumer_.reader_index_.load(std::memory_order_relaxed) + common_.index_mask_); });
}
}
} while (!next_free_index_.compare_exchange_strong(write_index, write_index + 1, std::memory_order_acq_rel, std::memory_order_acquire));
Expand All @@ -36,7 +38,7 @@ class MPSC {
// commit in the correct order to avoid problems
while (last_committed_index_.load(std::memory_order_relaxed) != write_index) {
// we don't return at this point even in case of ReturnImmediatelyStrategy as we've already taken the token
common_.put_wait_.wait([this, write_index] { return last_committed_index_.load(std::memory_order_relaxed) == write_index; });
common_.put_wait_.wait([this] { return last_committed_index_.load(std::memory_order_relaxed) == write_index; });
}

last_committed_index_.store(++write_index, std::memory_order_release);
Expand All @@ -50,7 +52,8 @@ class MPSC {
}

get_t get() noexcept {
while (consumer_.reader_index_2_ >= last_committed_index_.load(std::memory_order_relaxed)) {
while (consumer_.reader_index_2_ >= consumer_.last_committed_index_cache_) {
consumer_.last_committed_index_cache_ = last_committed_index_.load(std::memory_order_relaxed);
if constexpr (std::is_same<GetWaitStrategy, ReturnImmediateStrategy>::value) {
return std::nullopt;
} else {
Expand All @@ -66,7 +69,9 @@ class MPSC {
return contents;
}

// for testing only
void empty() noexcept {
consumer_.last_committed_index_cache_ = 0;
consumer_.reader_index_2_ = 0;
next_free_index_.store(0, std::memory_order_release);
last_committed_index_.store(0, std::memory_order_release);
Expand Down Expand Up @@ -98,7 +103,7 @@ class MPSC {
};

struct alignas(64) Consumer {
std::size_t next_free_index_cache_{0};
std::size_t last_committed_index_cache_{0};
std::size_t reader_index_2_{0};
std::atomic<std::size_t> reader_index_{0};
};
Expand Down
24 changes: 12 additions & 12 deletions include/wait_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,51 @@ template <typename Implementation>
class WaitStrategyInterface {
public:
template <class Predicate>
void wait(Predicate p) {}
void notify() {}
inline void wait(Predicate p) {}
inline void notify() {}
};

class ReturnImmediateStrategy : public WaitStrategyInterface<ReturnImmediateStrategy> {
public:
template <class Predicate>
void wait(Predicate p) {}
void notify() {}
inline void wait(Predicate p) {}
inline void notify() {}
};

class NoOpWaitStrategy : public WaitStrategyInterface<NoOpWaitStrategy> {
public:
template <class Predicate>
void wait(Predicate p) {}
void notify() {}
inline void wait(Predicate p) {}
inline void notify() {}
};

class PauseWaitStrategy : public WaitStrategyInterface<PauseWaitStrategy> {
public:
template <class Predicate>
void wait(Predicate p) {
inline void wait(Predicate p) {
cpu_pause();
}
void notify() {}
inline void notify() {}
};

class YieldWaitStrategy : public WaitStrategyInterface<YieldWaitStrategy> {
public:
template <class Predicate>
void wait(Predicate p) {
inline void wait(Predicate p) {
std::this_thread::yield();
}
void notify() {}
inline void notify() {}
};

class CVWaitStrategy : public WaitStrategyInterface<PauseWaitStrategy> {
public:
template <class Predicate>
void wait(Predicate p) {
inline void wait(Predicate p) {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait_for(lock, std::chrono::nanoseconds(100), p);
}

void notify() { cv_.notify_all(); }
inline void notify() { cv_.notify_all(); }

private:
std::condition_variable cv_;
Expand Down